# Extract weekly and quarterly observations in each store-upc-quarter 
# JHL

# 1. List all tsv files in RMS (module x year), pick out weekly observations for each store x upc x yq along with quarterly aggregations for weighted average price and quantities 

# Packages 
list.of.packages <- c("folderfun", "data.table", "bit64", "lubridate", "foreign", "dplyr", "ggplot2")
new.packages <- list.of.packages[!(list.of.packages %in% installed.packages()[,"Package"])]
if(length(new.packages)) install.packages(new.packages,repos = "http://cran.us.r-project.org")

require(folderfun)
require(data.table)
require(dplyr)
require(foreign)
require(ggplot2)
require(lubridate)
require(bit64) # Read long integers
setNumericRounding(0)

# Setup paths
setff("path_home")
setff("rms_move")
setff("rms_annual")

repository <- ffrms_move()
rms_ver_path <- paste0(ffrms_annual(),"/rms_versions_%d.tsv")
path_home <- ffpath_home()

do_path <- sprintf("%s/do/data_prep/01_price_index",path_home)
save_dir <- paste0(path_home,"/dta/nielsen/my_posted_module/%s_%s.rds")

# File table for all tsv files in RMS 
if(!file.exists(sprintf("%s/rms_0615_files.RData",do_path))){
	extract_files <- function(y){
		setwd(sprintf(repository,y))
		
		# Extract file names inside subdirectories 
		# Example: "/project2/databases/nielsen/nielsen_extracts/RMS/2006/Movement_Files/0501_2006/1272_2006.tsv"
		file.table <- as.list(system("find . -type f  -exec du -h {} + | sort -r -h",intern=T))
		file.table <- lapply(file.table,function(x){strsplit(x,split="\t./")})
		file.table <- lapply(file.table,'[[',1)
		file.table <- lapply(file.table,'[',2)
		
		# Combine into data table, create file names with full path
		file.table <- do.call(rbind,lapply(file.table,data.table))
		file.table[,file_name:=paste(sprintf(repository,y),"/",V1,sep="")]
		# Random id so that parallel nodes have similar workloads 
		set.seed(10)
		file.table[,':='(size_id=.I,V1=NULL,year=y,id=sample(1:nrow(file.table), nrow(file.table), replace=F))]
		
		# Add module number into file table
		file.table[ ,move:=tstrsplit(file_name,"/",fixed=T)[[length(tstrsplit(file_name,"/",fixed=T))]] ]
		file.table[ ,module:=tstrsplit(move,"_",fixed=T)[[1]] ]
		file.table[,':='(move=NULL)]
		
		return(file.table)	
	}

	file.table <- lapply(2006:2015,extract_files)
	file.table <- do.call(rbind,lapply(file.table,data.table))
	save(file.table, file = sprintf("%s/rms_0615_files.RData",do_path)) # Save to RData for quick loading 		 
}
	
load(sprintf("%s/rms_0615_files.RData",do_path))	

# For each year, read movement file (by module), pick out quantity-weighted average price (quarterly unit value) and posted price (weekly unit value) for a week in each quarter
for(y in 2006:2015){	
	# Naturally string, if use numerical value transform into numeric
	myID <- as.numeric(Sys.getenv("SLURM_ARRAY_TASK_ID"))

	# Select relevant year 
	file.table.year <- file.table[year==y,]
	setkey(file.table.year,id)
	quant = quantile(unique(file.table.year$id),seq(0,1,by=0.01))
	file.table.year = file.table.year[id>=quant[[myID]] & id<=quant[[myID+1]],]

	# For each movement file, collapse to quarterly data
	for(i in file.table.year$id){
		
		if(file.exists(sprintf(save_dir,file.table.year[id==i,module],y))){
			cat(sprintf('Module %s Year %s Skip ',file.table.year[id==i,module],y))
			next
		}
		
		# Read file, merge in upc_ver_uc
		move <- fread(file.table.year[id==i,file_name])
		move <- move[,week_end:=ymd(week_end)]	
		move <- move[,panel_year:=y]			
		setkey(move,upc,panel_year)
		
		rms_ver <- fread(sprintf(rms_ver_path,y,y))
		setkey(rms_ver,upc,panel_year)		
		move <- rms_ver[move,nomatch=0]
		rm(rms_ver)
		# setnames(move,"panel_year","year")
		move[,year:=panel_year]
		
		# Clean variables 
		move <- move[,`:=`(display=NULL,feature=NULL)]
		move <- move[,`:=`(quarter=quarter(week_end),unit_price=price/prmult)]
		move <- move[,`:=`(max_q_key=max(week_end)),by=.(store_code_uc,upc,upc_ver_uc,year,quarter)] # Create max (last available observation in each key), unit value, revenue (volume)
		move <- move[,`:=`(price=NULL,prmult=NULL)]
		
		# Create a balanced panel of weeks with Saturday endings
		dates <- data.table(seq(ymd(paste(y,"0101")),ymd(paste(y,"1231")),by=1))
		setnames(dates,"V1","week_end")
		dates[,':='(dow=wday(week_end),year=year(week_end),quarter=quarter(week_end))]
		dates <- dates[dow==7]
		dates[, dow:=NULL]
		dates[,max_q:=max(week_end),by=.(year,quarter)]
		dates <- dates[week_end==max_q] 
		dates[, week_end:=NULL]
		
		# Merge in quarter end week ends
		setkey(dates,year,quarter)
		setkey(move,year,quarter)
		move <- dates[move,nomatch=0] # Last week_end of each quarter added in, balanced 
		# move <- move[,max_q:=max(week_end),by=.(year,quarter)] # Last week for each quarter, unbalanced 
			
		# For weeks in the start of each quarter, replace with new units assuming uniform distribution of units across weeks, else use original units 
		move[,':='(diff=day(week_end)-7)]
		move[,new_units:=ifelse(diff<0 & month(week_end)%%3 == 1,units*(7+diff)/7,units)]
		
		# Create observations for end of each quarter for all starting weeks of quarters
		move_end <- move[diff<0 & month(week_end)%%3 == 1]
		move_end <- move_end[,':='(week_end=week_end-7-diff,new_units=units*(-diff)/7)]
		move_end <- move_end[,':='(year=year(week_end),quarter=quarter(week_end))]
		
		# Merge end observations back into main file
		move <- rbindlist(list(move,move_end))
		move[,':='(diff=NULL,units=NULL)]
		# setkey(move,store_code_uc,upc,upc_ver_uc,week_end)

		# Collapse units and prices (purchased price) to quarter (recollapse when merged across years to include starting obs. from other years)
		move <- move[,':='(Units=sum(new_units),Price=sum(unit_price*new_units)/sum(new_units)),by=.(store_code_uc,upc,upc_ver_uc,year,quarter)]
		move[,new_units:=NULL]
		
		# Keep last available observation in each key, include observations not in panel year
		move <- move[max_q_key==week_end|panel_year!=year]
		move <- move[,max_q_ind:=ifelse(max_q_key==max_q,1,0)] # Indicator for whether kept observation was last week of every quarter 
		move <- move[,`:=`(max_q=NULL,max_q_key=NULL,week_end=NULL)]

		saveRDS(move,sprintf(save_dir,file.table.year[id==i,module],y))
		cat(sprintf('Module %s Year %s Done ',file.table.year[id==i,module],y))
	}
}